-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sync before and after deleting #2268
Sync before and after deleting #2268
Conversation
@pplantinga I checked and it's not solving my issue (the PR made by Adel is). As you can see, it hangs at the end of an epoch, once validation is done and wants to go into the next |
Looks like the issue here is an |
Okay, this is ready for review again @TParcollet @Adel-Moumen , basically I propose replacing instances of |
# Sync before deleting to avoid another process saving at the same time. | ||
# This has led to errors as documented here: | ||
# https://github.com/speechbrain/speechbrain/issues/2250 | ||
ddp_barrier() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pplantinga what will happen if torch_recovery is called outside of a run on main? These barrier would be hit and MAIN_PROC_ENV wouldn't be 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Outside of run_on_main
the program should be operating with multiple processes, so all should hit the barrier together. The only scenario where it would still freeze is if you are inside if if_main_process():
block, which we should discourage use of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a solution could be developed to catch those bugs where you branch based on the main_process, but inside that branch you call some code which should hit a DDP barrier. So this will not automatically solve problems, but should help catch bugs. This would replace the if_main_process() (almost drop-in, just adds indentation).
BARRIER_PROTECTOR = "SPEECHBRAIN_DDP_BARRIER_PROTECTOR"
os.environ[BARRIER_PROTECTOR] = 0
class DDPProtector(object):
"""Protects from running into DDP Barrier in a code block that has already branched"""
def __enter__(self):
# Increment so that we can support nested protectors
os.environ[BARRIER_PROTECTOR] = str(int(os.environ[BARRIER_PROTECTOR])+1)
def on_main_process(self):
# ...There would be a check here...
return ## True if on main process, else False
def __exit__(self, exception_type, exception_value, traceback):
<something to possibly handle exceptions>
os.environ[BARRIER_PROTECTOR] = str(int(os.environ[BARRIER_PROTECTOR])-1)
return
def ddp_barrier():
"""In DDP mode, this function will synchronize all processes.
torch.distributed.barrier() will block processes until the whole
group enters this function.
"""
if int(os.environ[BARRIER_PROTECTOR]) > 0:
raise RuntimeError("DDP Barrier inside a main process only branch, this will create a deadlock or a subtle bug.")
# Check if we're in a single-threaded section, skip barrier
elif os.environ.get(MAIN_PROC_ENV, "0") == "1":
return
elif torch.distributed.is_initialized():
torch.distributed.barrier()
This would be simply used to mark that you intend not to run into DDP Barriers in this part of the code:
with DDPProtector() as protector:
if protector.on_main_process():
...
So when if_main_process()
is replaced by this, we should catch some bugs more easily.
It fixes my bug now. We still need Ryan to verify if it solves issue 2250 as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think at least the post_func logic should be checked. I also left a suggestion, which could help catch some bugs.
speechbrain/utils/distributed.py
Outdated
else: | ||
# But main comes here | ||
main_process_only(post_func)(*post_args, **post_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the logic is now inverted, post_func is meant to be run on everything else except main (e.g. load a tokenizer that was just created). With run_post_on_main, post_func is also run on main.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, you are totally right about this... I'll go ahead and fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be fixed in latest commit
speechbrain/utils/distributed.py
Outdated
""" | ||
import datetime | ||
import os | ||
import torch | ||
from functools import wraps | ||
|
||
MAIN_PROC_ENV = "MAIN_PROC_ONLY" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this should have a SPEECHBRAIN_ prefix just in case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to module-level variable, which makes this unnecessary
# Sync before deleting to avoid another process saving at the same time. | ||
# This has led to errors as documented here: | ||
# https://github.com/speechbrain/speechbrain/issues/2250 | ||
ddp_barrier() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a solution could be developed to catch those bugs where you branch based on the main_process, but inside that branch you call some code which should hit a DDP barrier. So this will not automatically solve problems, but should help catch bugs. This would replace the if_main_process() (almost drop-in, just adds indentation).
BARRIER_PROTECTOR = "SPEECHBRAIN_DDP_BARRIER_PROTECTOR"
os.environ[BARRIER_PROTECTOR] = 0
class DDPProtector(object):
"""Protects from running into DDP Barrier in a code block that has already branched"""
def __enter__(self):
# Increment so that we can support nested protectors
os.environ[BARRIER_PROTECTOR] = str(int(os.environ[BARRIER_PROTECTOR])+1)
def on_main_process(self):
# ...There would be a check here...
return ## True if on main process, else False
def __exit__(self, exception_type, exception_value, traceback):
<something to possibly handle exceptions>
os.environ[BARRIER_PROTECTOR] = str(int(os.environ[BARRIER_PROTECTOR])-1)
return
def ddp_barrier():
"""In DDP mode, this function will synchronize all processes.
torch.distributed.barrier() will block processes until the whole
group enters this function.
"""
if int(os.environ[BARRIER_PROTECTOR]) > 0:
raise RuntimeError("DDP Barrier inside a main process only branch, this will create a deadlock or a subtle bug.")
# Check if we're in a single-threaded section, skip barrier
elif os.environ.get(MAIN_PROC_ENV, "0") == "1":
return
elif torch.distributed.is_initialized():
torch.distributed.barrier()
This would be simply used to mark that you intend not to run into DDP Barriers in this part of the code:
with DDPProtector() as protector:
if protector.on_main_process():
...
So when if_main_process()
is replaced by this, we should catch some bugs more easily.
@pplantinga what do you think of @Gastron comments? |
My take is that the point of this PR is to fix bugs. We should merge it with the fixes so that unstable can be merged in develop and then we PR @Gastron idea into dev? |
speechbrain/utils/distributed.py
Outdated
@@ -103,8 +94,13 @@ def main_process_only(function): | |||
@wraps(function) | |||
def main_proc_wrapped_func(*args, **kwargs): | |||
"""This decorated function runs only if this is the main process.""" | |||
os.environ[MAIN_PROC_ENV] = "1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally I wonder if the environment variables (like MAIN_PROC_ENV
here) are the right way to do this sort of process-wide communication. I think something like a variable in a module (Python modules are singletons) should be enough here. So instead of this, I think we could just have:
MAIN_PROC_FLAG=0
def main_proc_wrapped_func(*args, **kwargs):
global __MAIN_PROC_FLAG
MAIN_PROC_FLAG = 1
...
MAIN_PROC_FLAG = 0
def ddp_barrier():
# Note: as long as this doesn't locally redefine MAIN_PROC_FLAG,
# it doesn't need to be marked as global, as it is not mutated.
if MAIN_PROC_FLAG == 1:
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, a module-level flag is better here.
You have some nice suggestions here @Gastron. You're right that the current setup would fail if there's a I was trying to avoid having double-indentation for this scenario, and the |
Perhaps we could encourage all user code to use run_on_main, then any branching flags or counters or such things can be implemented behind the scenes in library code. In the short term I think forcing run_on_main everywhere (getting rid of if_main_process) would mean a bigger refactor, since the local code (in the if if_main_process: block) would need to be moved into a new function or other callable. |
@Gastron and @pplantinga just to clarify one thing, if we keep using if_main_process (and we should, I agree with Aku), it will still work with this PR right? I don't see any reason why we must replace all the run_on_main? So basically, Mirco wants to release unstable in dev this week, so we need to settle this PR. We either revert with Adel's PR or we move forward with this one. @Gastron what is your opinion on merging this code and opening a new PR to develop your idea? I like the context manager if we can have something simple -- as if_main_process. @pplantinga if we merge, could you confirm that if_main_process will still work or it will break them and we must change all of them? |
In this PR, |
@pplantinga could we briefly assess what is impacted by this deadly combination? If it's minor, we could easily fix on dev. |
I don't understand why this is a problem introduced by this PR though. if_main_process always was incompatible with DDP barrier no? So we should not see it anywhere? |
Yes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can merge in dev and then move forward with a proper discussion between the people interested to go for a better handling of DDP barrier
@Gastron do you agree? If so, we merge and plan a meeting next week (if you guys are available) to solve this design issue properly. |
I think improving this incrementally makes sense, I guess we can indeed merge this and make further improvements soonish |
Thank you all for working on this. So, based on what is discussed here, I'm going to merge it. After that, you can discuss better solutions and implement them in another PR. |
To prevent the error in #2250 this PR adds a barrier before and after deletion so that no processes can write at the same time as the deletion.